#include <fc/ntp.hpp>
#include <fc/network/udp_socket.hpp>
#include <fc/network/resolve.hpp>
#include <fc/network/ip.hpp>
#include <fc/thread/thread.hpp>
#include <boost/chrono/system_clocks.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <stdint.h>
#pragma once

#ifdef _MSC_VER
# include <stdlib.h>
# define bswap_64(x) _byteswap_uint64(x)
#elif defined(__APPLE__)
# include <libkern/OSByteOrder.h>
# define bswap_64(x) OSSwapInt64(x)
#else
# include <byteswap.h>
#endif

#include <atomic>
#include <array>
#include <fc/thread/mutex.hpp>

namespace fc
{
    namespace bch = boost::chrono;
  namespace detail {

	  class ntp_response_selector
	  {
	  public:
		  static fc::mutex mu;
		  static std::map<string, ntp_info> db;
		  static int64_t select(const ntp_info& info)
		  {
			  mu.lock();
			  db[info.from] = info;
			  auto now = time_point::now();
			  //half an hour
			  now -= microseconds(1800000000);
			  std::map<string, ntp_info>::iterator it = db.begin();

			  int64_t sum = 0;
			  int64_t count = 0;
			  int64_t max = 0;
			  int64_t min = 0;
			  while (it != db.end())
			  {
				  if (it->second._last_valid_ntp_reply_received_time < now)
				  {
					  it = db.erase(it);
				  }
				  else
				  {
					  auto delta = it->second._last_ntp_delta_microseconds;
					  sum += delta;
					  ++count;
					  if (min > delta)
						  min = delta;
					  if (max < delta)
						  max = delta;
					  it++;
				  }
			  }
			  mu.unlock();
			  if (count == 0)
				  return 0;
			  if (db.size() > 5)
			  {
				  sum -= max;
				  sum -= min;
				  count -= 2;
			  }
			  return sum / count;
		  }
		  static void clear()
		  {

			  mu.lock();
			  db.clear();
			  mu.unlock();
		  }
	  };
	  
	  fc::mutex ntp_response_selector::mu;
	  std::map<string, ntp_info> ntp_response_selector::db;
  class ntp_impl 
  {
    public:
      /** vector < host, port >  */
      fc::thread                                       _ntp_thread;
      std::vector< std::pair< std::string, uint16_t> > _ntp_hosts;
      fc::future<void>                                 _read_loop_done;
      udp_socket                                       _sock;
      uint32_t                                         _request_interval_sec;
      uint32_t                                         _retry_failed_request_interval_sec;
      fc::time_point                                   _last_valid_ntp_reply_received_time;

      static std::atomic_bool                                 _last_ntp_delta_initialized;
      static std::atomic<int64_t>                             _last_ntp_delta_microseconds;


      fc::future<void>                                 _request_time_task_done;

      ntp_impl() :
      _ntp_thread("ntp"),
      _request_interval_sec( 60*60 /* 1 hr */),
      _retry_failed_request_interval_sec(60 * 5)
      { 
		_last_ntp_delta_microseconds = 0;
        _last_ntp_delta_initialized = false;
        _ntp_hosts.push_back(std::make_pair("pool.ntp.org", 123));
		_ntp_hosts.push_back(std::make_pair("cn.ntp.org.cn", 123));
		_ntp_hosts.push_back(std::make_pair("edu.ntp.org.cn", 123));
		_ntp_hosts.push_back(std::make_pair("ntp1.aliyun.com", 123));
		_ntp_hosts.push_back(std::make_pair("ntp2.aliyun.com", 123));
		_ntp_hosts.push_back(std::make_pair("ntp3.aliyun.com", 123));
		_ntp_hosts.push_back(std::make_pair("ntp4.aliyun.com", 123));
		_ntp_hosts.push_back(std::make_pair("ntp5.aliyun.com", 123));
		_ntp_hosts.push_back(std::make_pair("ntp6.aliyun.com", 123));
		_ntp_hosts.push_back(std::make_pair("ntp7.aliyun.com", 123));
        
      } 

      ~ntp_impl() 
      { 
      }

      fc::time_point ntp_timestamp_to_fc_time_point(uint64_t ntp_timestamp_net_order)
      {
        uint64_t ntp_timestamp_host = bswap_64(ntp_timestamp_net_order);
        uint32_t fractional_seconds = ntp_timestamp_host & 0xffffffff;
        uint32_t microseconds = (uint32_t)((((uint64_t)fractional_seconds * 1000000) + (uint64_t(1) << 31)) >> 32);
        uint32_t seconds_since_1900 = ntp_timestamp_host >> 32;
        uint32_t seconds_since_epoch = seconds_since_1900 - 2208988800;
        return fc::time_point() + fc::seconds(seconds_since_epoch) + fc::microseconds(microseconds);
      }

      uint64_t fc_time_point_to_ntp_timestamp(const fc::time_point& fc_timestamp)
      {
        uint64_t microseconds_since_epoch = (uint64_t)fc_timestamp.time_since_epoch().count();
        uint32_t seconds_since_epoch = (uint32_t)(microseconds_since_epoch / 1000000);
        uint32_t seconds_since_1900 = seconds_since_epoch + 2208988800;
        uint32_t microseconds = microseconds_since_epoch % 1000000;
        uint32_t fractional_seconds = (uint32_t)((((uint64_t)microseconds << 32) + (uint64_t(1) << 31)) / 1000000);
        uint64_t ntp_timestamp_net_order = ((uint64_t)seconds_since_1900 << 32) + fractional_seconds;
        return bswap_64(ntp_timestamp_net_order);
      }

      void request_now()
      {
        assert(_ntp_thread.is_current());
        for( auto item : _ntp_hosts )
        {
          try 
          {
            //wlog( "resolving... ${r}", ("r", item) );
            auto eps = resolve( item.first, item.second );
            for( auto ep : eps )
            {
              //wlog( "sending request to ${ep}", ("ep",ep) );
              std::shared_ptr<char> send_buffer(new char[48], [](char* p){ delete[] p; });
              std::array<unsigned char, 48> packet_to_send { {0xdb,0,0,0,0,0,0,0,0} };
              memcpy(send_buffer.get(), packet_to_send.data(), packet_to_send.size());
              uint64_t* send_buf_as_64_array = (uint64_t*)send_buffer.get();
              send_buf_as_64_array[5] = fc_time_point_to_ntp_timestamp(fc::time_point::local_now()); // 5 = Transmit Timestamp
              _sock.send_to(send_buffer, packet_to_send.size(), ep);
              break;
            }
          } 
          catch (const fc::canceled_exception&)
          {
            throw;
          }
          // this could fail to resolve but we want to go on to other hosts..
          catch ( const fc::exception& e )
          {
            //elog( "${e}", ("e",e.to_detail_string() ) ); 
          }
        }
      } // request_now

      // started for first time in ntp() constructor, canceled in ~ntp() destructor
      // this task gets invoked every _retry_failed_request_interval_sec (currently 5 min), and if
      // _request_interval_sec (currently 1 hour) has passed since the last successful update, 
      // it sends a new request
      void request_time_task()
      {
        assert(_ntp_thread.is_current());
        if (_last_valid_ntp_reply_received_time <= fc::time_point::local_now() - fc::seconds(_request_interval_sec - 5))
          request_now();
        if (!_request_time_task_done.valid() || !_request_time_task_done.canceled())
          _request_time_task_done = schedule( [=](){ request_time_task(); }, 
                                              fc::time_point::local_now() + fc::seconds(_retry_failed_request_interval_sec), 
                                              "request_time_task" );
      } // request_loop

      void start_read_loop()
      {
        _read_loop_done = _ntp_thread.async( [this](){ read_loop(); }, "ntp_read_loop" );
      }

      void read_loop()
      {
        assert(_ntp_thread.is_current());

        uint32_t receive_buffer_size = sizeof(uint64_t) * 1024;
        std::shared_ptr<char> receive_buffer(new char[receive_buffer_size], [](char* p){ delete[] p; });
        uint64_t* recv_buf = (uint64_t*)receive_buffer.get();

        //outer while to restart read-loop if exception is thrown while waiting to receive on socket.
        while( !_read_loop_done.canceled() )
        {
          // if you start the read while loop here, the recieve_from call will throw "invalid argument" on win32,
          // so instead we start the loop after making our first request
          try 
          {
            _sock.open();
            request_time_task(); //this will re-send a time request

            while( !_read_loop_done.canceled() )
            {
              fc::ip::endpoint from;
              try
              {
                _sock.receive_from( receive_buffer, receive_buffer_size, from );
                //wlog("received ntp reply from ${from}",("from",from) );
              } FC_RETHROW_EXCEPTIONS(error, "Error reading from NTP socket");

              fc::time_point receive_time = fc::time_point::local_now();
              fc::time_point origin_time = ntp_timestamp_to_fc_time_point(recv_buf[3]);
              fc::time_point server_receive_time = ntp_timestamp_to_fc_time_point(recv_buf[4]);
              fc::time_point server_transmit_time = ntp_timestamp_to_fc_time_point(recv_buf[5]);

              fc::microseconds offset(((server_receive_time - origin_time) +
                                       (server_transmit_time - receive_time)).count() / 2);
              fc::microseconds round_trip_delay((receive_time - origin_time) -
                                                (server_transmit_time - server_receive_time));
              ////wlog("origin_time = ${origin_time}, server_receive_time = ${server_receive_time}, server_transmit_time = ${server_transmit_time}, receive_time = ${receive_time}",
              //     ("origin_time", origin_time)("server_receive_time", server_receive_time)("server_transmit_time", server_transmit_time)("receive_time", receive_time));
              //wlog("ntp offset: ${offset}, round_trip_delay ${delay}", ("offset", offset)("delay", round_trip_delay));

              //if the reply we just received has occurred more than a second after our last time request (it was more than a second ago since our last request)
              if( round_trip_delay > fc::seconds(1) )
              {
                //wlog("received stale ntp reply requested at ${request_time}, send a new time request", ("request_time", origin_time));
                request_now(); //request another reply and ignore this one
              }
              else //we think we have a timely reply, process it
              {
                if( offset < fc::seconds(60*60*24) && offset > fc::seconds(-60*60*24) )
                {
				  ntp_info info;
				  info.from = from.operator fc::string();
				  info._last_ntp_delta_initialized = true;
				  info._last_valid_ntp_reply_received_time = receive_time;
				  info._last_ntp_delta_microseconds = offset.count();
				  _last_ntp_delta_microseconds = ntp_response_selector::select(info);
                  _last_ntp_delta_initialized = true;
                  fc::microseconds ntp_delta_time = fc::microseconds(_last_ntp_delta_microseconds);
                  _last_valid_ntp_reply_received_time = receive_time;
                  //wlog("ntp_delta_time updated to ${delta_time}", ("delta_time",ntp_delta_time) );
                }
				/*else
                  elog( "NTP time and local time vary by more than a day! ntp:${ntp_time} local:${local}", 
                       ("ntp_time", receive_time + offset)("local", fc::time_point::local_now()) );
					   */
              }
            }
          } // try
          catch (fc::canceled_exception)
          {
            throw;
          }
          catch (const fc::exception& e)
          {
            //swallow any other exception and restart loop
            //elog("exception in read_loop, going to restart it. ${e}",("e",e));
          }
          catch (...)
          {
            //swallow any other exception and restart loop
            //elog("unknown exception in read_loop, going to restart it.");
          }
          _sock.close();
          fc::usleep(fc::seconds(_retry_failed_request_interval_sec));
        } //outer while loop
        //wlog("exiting ntp read_loop");
      } //end read_loop()
    }; //ntp_impl

	std::atomic_bool                                 ntp_impl::_last_ntp_delta_initialized;
	std::atomic<int64_t>                             ntp_impl::_last_ntp_delta_microseconds;
  } // namespace detail
  ntp::ntp()
  :my( new detail::ntp_impl() )
  {
    my->start_read_loop();
  }

  ntp::~ntp()
  {
    my->_ntp_thread.async([=](){
      try
      {
        my->_request_time_task_done.cancel_and_wait("ntp object is destructing");
      }
      catch ( const fc::exception& e )
      {
        //wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring: ${e}", ("e",e) );
      }
      catch (...)
      {
        //wlog( "Exception thrown while shutting down NTP's request_time_task, ignoring" );
      }
      
      try 
      {
        my->_read_loop_done.cancel_and_wait("ntp object is destructing");
      } 
      catch ( const fc::exception& e )
      {
        //wlog( "Exception thrown while shutting down NTP's read_loop, ignoring: ${e}", ("e",e) );
      }
      catch (...)
      {
        //wlog( "Exception thrown while shutting down NTP's read_loop, ignoring" );
      }

    }, "ntp_shutdown_task").wait();
  }


  void ntp::add_server( const std::string& hostname, uint16_t port)
  {
    my->_ntp_thread.async( [=](){ my->_ntp_hosts.push_back( std::make_pair(hostname,port) ); }, "add_server" ).wait();
  }

  void ntp::set_request_interval( uint32_t interval_sec )
  {
    my->_request_interval_sec = interval_sec;
    my->_retry_failed_request_interval_sec = std::min(my->_retry_failed_request_interval_sec, interval_sec);
  }

  void ntp::request_now()
  {
    my->_ntp_thread.async( [=](){ my->request_now(); }, "request_now" ).wait();
  }

  void ntp::re_request_now()
  {
	  detail::ntp_response_selector::clear();
	  request_now();
  }

  optional<time_point> ntp::get_time()const
  {
    if( my->_last_ntp_delta_initialized )
      return microseconds(bch::duration_cast<bch::microseconds>(bch::system_clock::now().time_since_epoch()).count()) + fc::microseconds(my->_last_ntp_delta_microseconds);
    return optional<time_point>();
  }

  int64_t ntp::get_delta_microseconds() 
  {
	  if(detail::ntp_impl::_last_ntp_delta_initialized)
		return detail::ntp_impl::_last_ntp_delta_microseconds;
	  return 0;
  }
  ntp_info ntp::get_ntp_info() const
  {
	  ntp_info res;
	  res._last_valid_ntp_reply_received_time=my->_last_valid_ntp_reply_received_time;
	  res._last_ntp_delta_initialized=my->_last_ntp_delta_initialized;
	  res._last_ntp_delta_microseconds=my->_last_ntp_delta_microseconds;
	  return res;
  }



} //namespace fc
